package org.adam2.chamuel.sink.agent;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.flume.*;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import java.util.List;
import java.util.Map;
import java.util.Properties;

public class StockRecordAgentSink extends AbstractSink implements Configurable {
    private String hostname;
    private Integer port;
    private Properties clientProps;
    private RpcClient client;
    private final static int batchSize = 1000;

    public Status process() throws EventDeliveryException {
    	System.out.println("agent source process");
        Status status = Status.READY;
        Channel channel = this.getChannel();
        Transaction transaction = channel.getTransaction();

        try {
            transaction.begin();
            List<Event> batch = Lists.newLinkedList();

            int size;
            for(size = 0; size < batchSize; ++size) {
                Event event = channel.take();
                if (event == null) {
                    break;
                }
                batch.add(event);
            }

            size = batch.size();
            if (size == 0) {
                status = Status.BACKOFF;
            } else {
                System.out.println("agent sink batch size:  "+batch.size());
                this.client.appendBatch(batch);
            }

            transaction.commit();
        } catch (Throwable var10) {
            transaction.rollback();
            if (var10 instanceof Error) {
                throw (Error)var10;
            }

            if (!(var10 instanceof ChannelException)) {
                throw new EventDeliveryException("Failed to send events", var10);
            }

            status = Status.BACKOFF;
        } finally {
            transaction.close();
        }

        return status;
    }

    public void configure(Context context) {
        this.clientProps = new Properties();
        this.hostname = context.getString("hostname");
        this.port = context.getInteger("port");
        Preconditions.checkState(this.hostname != null, "No hostname specified");
        Preconditions.checkState(this.port != null, "No port specified");
        this.clientProps.setProperty("hosts", "h1");
        this.clientProps.setProperty("hosts.h1", this.hostname + ":" + this.port);
        UnmodifiableIterator var2 = context.getParameters().entrySet().iterator();

        while (var2.hasNext()) {
            Map.Entry<String, String> entry = (Map.Entry) var2.next();
            this.clientProps.setProperty((String) entry.getKey(), (String) entry.getValue());
        }

        client = RpcClientFactory.getInstance(clientProps);
    }
}
